home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
MACD 5
/
MACD 5.bin
/
workbench
/
tools
/
czesc_2
/
fifolib
/
fifo.c
< prev
next >
Wrap
C/C++ Source or Header
|
1992-04-24
|
10KB
|
505 lines
/*
* FIFO.C
*/
#include "defs.h"
void ReCalcReaderIdx(Fifo *);
void SignalEof(Fifo *);
void FixFiFlags(Fifo *);
LibCall void RequestFifo(FHan *, Message *, long);
void SetEOF(Fifo *);
long AvailWBufSpace(Fifo *);
__stkargs long BitTestSet(char *, short);
/*
* Open up a new fifo by name.
*/
LibCall FHan *
OpenFifo(name, bytes, flags)
char *name;
long bytes;
long flags;
{
Fifo *fifo;
FHan *fhan;
{ /* bytes a power of 2? */
unsigned long i = 8;
while (i) {
if (bytes == i)
break;
i = i << 1;
}
if (i == 0)
return(NULL);
}
if (fhan = AllocMem(sizeof(FHan), MEMF_PUBLIC | MEMF_CLEAR)) {
Forbid();
if (fifo = (Fifo *)FindName((MaxList *)&FifoList, name)) {
;
} else {
if (fifo = AllocMem(sizeof(Fifo) - sizeof(fifo->fi_Buf) + bytes + strlen(name) + 1, MEMF_PUBLIC | MEMF_CLEAR)) {
AddTail((MaxList *)&FifoList, &fifo->fi_Node);
NewList((MaxList *)&fifo->fi_HanList);
NewList((MaxList *)&fifo->fi_WrNotify);
NewList((MaxList *)&fifo->fi_RdNotify);
fifo->fi_BufSize = bytes;
fifo->fi_BufMask = bytes - 1;
fifo->fi_Node.ln_Name = (char *)fifo + (sizeof(Fifo) - sizeof(fifo->fi_Buf)) + bytes;
strcpy(fifo->fi_Node.ln_Name, name);
InitSemaphore(&fifo->fi_SigSem);
}
}
if (fifo) {
AddTail((MaxList *)&fifo->fi_HanList, (MaxNode *)&fhan->fh_Node);
fhan->fh_Fifo = fifo;
fhan->fh_Flags = flags;
fhan->fh_Msg.mn_ReplyPort = &fhan->fh_Port;
fhan->fh_Port.mp_Node.ln_Type = NT_MSGPORT;
fhan->fh_Port.mp_Flags = PA_SIGNAL;
fhan->fh_Port.mp_SigBit = SIGB_SINGLE;
NewList(&fhan->fh_Port.mp_MsgList);
if (flags & FIFOF_READ) {
++fifo->fi_RRefs;
fhan->fh_RIdx = fifo->fi_RIdx;
}
if (flags & FIFOF_WRITE) {
++fifo->fi_WRefs;
}
/*
if (flags & FIFOF_CLOSEOF)
fifo->fi_Flags |= FIFOF_CLOSEOF;
*/
if (flags & FIFOF_KEEPIFD)
fifo->fi_Flags |= FIFOF_KEEPIFD;
if (flags & FIFOF_RREQUIRED)
fifo->fi_Flags |= FIFOF_RREQUIRED;
++fifo->fi_ORefs;
FixFiFlags(fifo);
} else {
FreeMem(fhan, sizeof(FHan));
fhan = NULL;
}
Permit();
}
return(fhan);
}
/*
* Close a previously openned fifo
*/
LibCall void
CloseFifo(fhan, flags)
FHan *fhan;
long flags;
{
Fifo *fifo;
Forbid();
if (fifo = fhan->fh_Fifo) {
fhan->fh_Fifo = NULL; /* try to catch duplicate closes */
Remove((MaxNode *)&fhan->fh_Node);
--fifo->fi_ORefs;
if (fhan->fh_Flags & FIFOF_WRITE) {
if (flags & FIFOF_EOF)
fifo->fi_Flags |= FIFOF_CLOSEOF;
if (--fifo->fi_WRefs == 0) {
if (fifo->fi_Flags & FIFOF_CLOSEOF) {
SetEOF(fifo);
}
}
}
if (fhan->fh_Flags & FIFOF_READ) {
--fifo->fi_RRefs;
if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED)) {
Message *msg;
while (msg = RemHead((MaxList *)&fifo->fi_WrNotify))
ReplyMsg(msg);
}
}
if (fifo->fi_ORefs == 0) {
if ((fifo->fi_Flags & FIFOF_KEEPIFD) == 0 || fifo->fi_RIdx == fifo->fi_WIdx) {
Remove(&fifo->fi_Node);
FreeMem(fifo, sizeof(Fifo) - sizeof(fifo->fi_Buf) + fifo->fi_BufSize + strlen(fifo->fi_Node.ln_Name) + 1);
}
} else {
if (fhan->fh_Flags & FIFOF_WRITE)
SignalEof(fifo);
if (fhan->fh_Flags & FIFOF_READ)
ReCalcReaderIdx(fifo);
FixFiFlags(fifo);
}
FreeMem(fhan, sizeof(FHan));
}
Permit();
}
/*
* Read from a fifo. Block if necessary (and not FIFOF_NBIO)
*/
LibCall long
ReadFifo(fhan, pbuf, skip)
FHan *fhan;
char **pbuf;
long skip;
{
long n;
Fifo *fifo = fhan->fh_Fifo;
/*
* attempt to clear <skip> bytes
*/
while (skip > 0) {
long widx = fifo->fi_WIdx; /* snapshot widx */
long ridx = fhan->fh_RIdx;
long len;
if (ridx <= widx) {
len = widx - ridx;
} else {
len = fifo->fi_BufSize - ridx;
}
if (len == 0)
break;
if (len > skip)
len = skip;
n += len;
skip -= len;
Forbid();
fhan->fh_RIdx = (ridx + len) & fifo->fi_BufMask;
if (ridx == fifo->fi_RIdx)
ReCalcReaderIdx(fifo); /* update mast-idx/writer-waiters */
Permit();
}
/*
* return available data
*/
for (;;) {
long widx = fifo->fi_WIdx; /* snapshot widx */
long ridx = fhan->fh_RIdx;
if (ridx <= widx) {
n = widx - ridx;
} else {
n = fifo->fi_BufSize - ridx;
}
*pbuf = fifo->fi_Buf + ridx;
if (n == 0) {
/*
* EOF on a per-handle basis since it gets cleared after EOF
* is returned.
*/
if ((fhan->fh_Flags & FIFOF_EOF) || (fifo->fi_Flags & FIFOF_EOF)) {
/*fhan->fh_Flags &= ~FIFOF_EOF;*/
n = -1;
break;
}
if ((fhan->fh_Flags & FIFOF_NBIO) == 0) {
fhan->fh_Port.mp_SigTask = SysBase->ThisTask;
RequestFifo(fhan, &fhan->fh_Msg, FREQ_RPEND);
WaitPort(&fhan->fh_Port);
Remove((MaxNode *)&fhan->fh_Msg);
continue;
}
}
break;
}
return(n);
}
/*
* Write to a fifo
*/
LibCall long
WriteFifo(fhan, buf, bytes)
FHan *fhan;
char *buf;
long bytes;
{
long n = -1;
short wsigchk = 0;
short normal = 0;
Fifo *fifo = fhan->fh_Fifo;
if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED))
return(-1);
if (bytes < 0 || bytes > (fifo->fi_BufSize >> 1))
return(-2);
Forbid();
/*
* A normal FIFO uses fi_SigSem
* A non-normal FIFO cannot afford to block and uses fi_Lock
*/
if (fifo->fi_Flags & FIFOF_WRNORM)
normal = 1;
if (normal) {
ObtainSemaphore(&fifo->fi_SigSem);
} else if (BitTestSet(&fifo->fi_Lock, 0) != 0) {
Permit();
return(n);
}
{
n = 0;
for (;;) {
while (bytes) {
long ridx = fifo->fi_RIdx; /* snapshot ridx */
long len;
if (fifo->fi_WIdx < ridx) {
len = ridx - fifo->fi_WIdx;
if (len <= bytes) /* FIFO FULL */
break;
} else {
len = fifo->fi_BufSize - fifo->fi_WIdx;
if (len + ridx <= bytes) /* FIFO FULL */
break;
}
if (len > bytes)
len = bytes;
movmem(buf, fifo->fi_Buf + fifo->fi_WIdx, len);
buf += len;
n += len;
bytes -= len;
fifo->fi_WIdx = (fifo->fi_WIdx + len) & fifo->fi_BufMask;
if (fhan->fh_Flags & FIFOF_NORMAL)
wsigchk = 1;
}
/*
* if fifo is full and NBIO not set, then block and loop
*/
if (bytes && !(fhan->fh_Flags & FIFOF_NBIO)) {
fhan->fh_Port.mp_SigTask = SysBase->ThisTask;
RequestFifo(fhan, &fhan->fh_Msg, FREQ_WAVAIL);
WaitPort(&fhan->fh_Port);
Remove((MaxNode *)&fhan->fh_Msg);
continue;
}
break;
}
/*
* check for any blocked readers, data is probably now available
* so wake them up.
*/
if (wsigchk && fifo->fi_RdNotify.mlh_Head->mln_Succ) {
Message *msg;
while (msg = RemHead((MaxList *)&fifo->fi_RdNotify))
ReplyMsg(msg);
}
if (normal)
ReleaseSemaphore(&fifo->fi_SigSem);
else
fifo->fi_Lock = 0;
}
Permit();
return(n);
}
LibCall long
BufSizeFifo(fhan)
FHan *fhan;
{
return(fhan->fh_Fifo->fi_BufSize);
}
/*
* Calculate distance between fh->fh_RIdx and fifo->fi_RIdx, shortest
* wins. If none found then nothing changes due to initial best set
* to exactly the buffer size.
*
* If the master index is modified and writers are waiting, signal them.
*
* Called while Forbid() or otherwise reader-lockedout
*/
void
ReCalcReaderIdx(fifo)
Fifo *fifo;
{
FHan *fh;
long bestLen = fifo->fi_BufSize;
long ridx;
for (fh = fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = fh->fh_Node.mln_Succ) {
if (fh->fh_Flags & FIFOF_READ) {
long len = (fh->fh_RIdx - fifo->fi_RIdx) & fifo->fi_BufMask;
if (len < bestLen)
bestLen = len;
}
}
ridx = (fifo->fi_RIdx + bestLen) & fifo->fi_BufMask;
/*
* more buffer space available, wakeup writers?
*/
if (ridx != fifo->fi_RIdx) {
fifo->fi_RIdx = ridx;
if (fifo->fi_WrNotify.mlh_Head->mln_Succ) {
Message *msg;
if (AvailWBufSpace(fifo) >= (fifo->fi_BufSize >> 1)) {
while (msg = RemHead((MaxList *)&fifo->fi_WrNotify))
ReplyMsg(msg);
}
}
}
}
/*
* signal EOF to any blocked readers
*/
void
SignalEof(fifo)
Fifo *fifo;
{
FHan *fh;
if (fifo->fi_Flags & FIFOF_EOF) {
Message *msg;
Forbid();
while (msg = RemHead((MaxList *)&fifo->fi_RdNotify))
ReplyMsg(msg);
Permit();
}
}
/*
* SetEOF()
*/
void
SetEOF(fifo)
Fifo *fifo;
{
FHan *fh;
for (fh = fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = fh->fh_Node.mln_Succ)
fh->fh_Flags |= FIFOF_EOF;
fifo->fi_Flags |= FIFOF_EOF;
}
/*
* FIXME, if state change occurs in master fifo, must unblock anybody
* blocked due to previous information. XXX
*/
void
FixFiFlags(fifo)
Fifo *fifo;
{
FHan *fh;
long rflags = FIFOF_RDNORM | FIFOF_WRNORM;
fifo->fi_Flags &= ~rflags;
for (fh = fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = fh->fh_Node.mln_Succ) {
if ((fh->fh_Flags & FIFOF_NORMAL) == 0) {
if (fh->fh_Flags & FIFOF_READ)
rflags &= ~FIFOF_RDNORM;
if (fh->fh_Flags & FIFOF_WRITE)
rflags &= ~FIFOF_WRNORM;
}
}
fifo->fi_Flags |= rflags;
}
/*
* request message on event. Returns message immediately if event already
* satisfied.
*/
LibCall void
RequestFifo(fhan, msg, req)
FHan *fhan;
Message *msg;
long req;
{
Fifo *fifo = fhan->fh_Fifo;
Forbid();
switch(req) {
case FREQ_RPEND:
if ((fhan->fh_Flags & FIFOF_EOF) || fhan->fh_RIdx != fifo->fi_WIdx) {
ReplyMsg(msg);
} else {
msg->mn_Node.ln_Type = NT_MESSAGE;
AddTail((MaxList *)&fifo->fi_RdNotify, &msg->mn_Node);
}
break;
case FREQ_WAVAIL:
/*
* determine available buffer space, alert if more than 1/2 empty.
*
* check for broken pipe
*/
if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED)) {
ReplyMsg(msg);
} else if (AvailWBufSpace(fifo) >= (fifo->fi_BufSize >> 1)) {
ReplyMsg(msg);
} else {
msg->mn_Node.ln_Type = NT_MESSAGE;
AddTail((MaxList *)&fifo->fi_WrNotify, &msg->mn_Node);
}
break;
case FREQ_ABORT:
if (msg->mn_Node.ln_Type == NT_MESSAGE) { /* if not returned */
Remove(&msg->mn_Node); /* return it */
ReplyMsg(msg);
}
break;
}
Permit();
}
long
AvailWBufSpace(fifo)
Fifo *fifo;
{
long ridx = fifo->fi_RIdx;
long len;
if (fifo->fi_WIdx < ridx)
len = ridx - fifo->fi_WIdx;
else
len = fifo->fi_BufSize - fifo->fi_WIdx + ridx - 1;
return(len);
}